summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/otverioiq.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/otverioiq.py')
-rw-r--r--glucometerutils/drivers/otverioiq.py37
1 files changed, 22 insertions, 15 deletions
diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py
index 69f88de..b4a4a6c 100644
--- a/glucometerutils/drivers/otverioiq.py
+++ b/glucometerutils/drivers/otverioiq.py
@@ -16,7 +16,9 @@ auto-detected.
"""
import binascii
+import datetime
import logging
+from typing import Any, Dict, Generator, Optional
import construct
@@ -101,18 +103,18 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
DEFAULT_CABLE_ID = "10c4:85a7" # Specific ID for embedded cp210x
TIMEOUT = 0.5
- def __init__(self, device):
- super(Device, self).__init__(device)
+ def __init__(self, device: Optional[str]) -> None:
+ super().__init__(device)
self.buffered_reader_ = construct.Rebuffered(_PACKET, tailcutoff=1024)
- def _send_packet(self, message):
+ def _send_packet(self, message: bytes) -> None:
pkt = _PACKET.build({"data": {"value": {"message": message}}})
logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
self.serial_.flush()
- def _read_packet(self):
+ def _read_packet(self) -> construct.Container:
raw_pkt = self.buffered_reader_.parse_stream(self.serial_).data
logging.debug("received packet: %r", raw_pkt)
@@ -121,7 +123,12 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
return pkt
- def _send_request(self, request_format, request_obj, response_format):
+ def _send_request(
+ self,
+ request_format: construct.struct,
+ request_obj: Optional[Dict[str, Any]],
+ response_format: construct.Struct,
+ ) -> construct.Container:
try:
request = request_format.build(request_obj)
self._send_packet(request)
@@ -132,7 +139,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
except construct.ConstructError as e:
raise lifescan.MalformedCommand(str(e))
- def get_meter_info(self):
+ def get_meter_info(self) -> common.MeterInfo:
return common.MeterInfo(
"OneTouch Verio IQ glucometer",
serial_number=self.get_serial_number(),
@@ -140,47 +147,47 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
native_unit=self.get_glucose_unit(),
)
- def get_version(self):
+ def get_version(self) -> str:
response = self._send_request(_VERSION_REQUEST, None, _VERSION_RESPONSE)
return response.version
- def get_serial_number(self):
+ def get_serial_number(self) -> str:
response = self._send_request(
_SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE
)
return response.serial_number
- def get_datetime(self):
+ def get_datetime(self) -> datetime.datetime:
response = self._send_request(_READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
- def _set_device_datetime(self, date):
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
self._send_request(_WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
- def zero_log(self):
+ def zero_log(self) -> None:
self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
- def get_glucose_unit(self):
+ def get_glucose_unit(self) -> common.Unit:
response = self._send_request(
_GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE
)
return response.unit
- def _get_reading_count(self):
+ def _get_reading_count(self) -> int:
response = self._send_request(
_READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
)
return response.count
- def _get_reading(self, record_id):
+ def _get_reading(self, record_id: int) -> Optional[common.GlucoseReading]:
response = self._send_request(
_READ_RECORD_REQUEST, {"record_id": record_id}, _READING_RESPONSE
)
@@ -193,7 +200,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
response.timestamp, float(response.value), meal=response.meal
)
- def get_readings(self):
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
record_count = self._get_reading_count()
for record_id in range(record_count):
reading = self._get_reading(record_id)